4Quant
Java User Group , 11 January 2016
| Mammography | Satellite |
|---|---|
| Brain Tumors | Throat Cancer |
|---|---|
Recent improvements allow you to:
[1] Mokso et al., J. Phys. D, 46(49),2013
Courtesy of M. Pistone at U. Bristol
If you looked at one 1000 x 1000 sized image every second
It would take you
139
hours
to browse through a terabyte of data.
| Year | Time to 1 TB | Man power to keep up | Salary Costs / Month |
|---|---|---|---|
| 2000 | 4096 min | 2 people | 25 kCHF |
| 2008 | 1092 min | 8 people | 95 kCHF |
| 2014 | 32 min | 260 people | 3255 kCHF |
| 2016 | 2 min | 3906 people | 48828 kCHF |
Normally when problems are approached they are solved for a single task as quickly as possible
im_in=imread('test.jpg');
im_filter=medfilt2(im_in,[5,5]);
cl_img=bwlabel(im_filter>10);
cl_count=hist(cl_img,1:100);
dlmwrite(cl_count,'out.txt')
You have to rewrite everything, everytime
If you start with a bad approach, it is very difficult to fix, big data and reproducibility must be considered from the beginning
final int[][][] outMap = new int[nSize.x * 2 + 1][nSize.z * 2 + 1][nSize.z * 2 + 1];
int dz = 0;
for (int z2 = max(z - nSize.z, lowz); z2 <= min(z + nSize.z, uppz - 1); z2++, dz++) {
int dy = 0;
for (int y2 = max(y - nSize.y, lowy); y2 <= min(y + nSize.y,
uppy - 1); y2++, dy++) {
int off2 = (z2 * dim.y + y2) * dim.x + max(x - nSize.x, lowx);
int dx = 0;
for (int x2 = max(x - nSize.x, lowx); x2 <= min(x + nSize.x,
uppx - 1); x2++, off2++, dx++) {
outMap[dx][dy][dz] = distmap[off2];
}
}
}
return outMap;
Parallelism is when you can divide a task into separate pieces which can then be worked on at the same time.
Some tasks are easy to parallelize while others are very difficult. Rather than focusing on programming, real-life examples are good indicators of difficultly.
Distributed computing is very similar to parallel computing, but a bit more particular. Parallel means you process many tasks at the same time, while distributed means you are no longer on the same CPU, process, or even on the same machine.
The distributed has some important implications since once you are no longer on the same machine the number of variables like network delay, file system issues, and other users becomes a major problem.
The largest issue with parallel / distributed tasks is the need to access the same resources at the same time
Parallel computing requires a significant of coordinating between computers for non-easily parallelizable tasks.
The second major issue is mutability, if you have two cores / computers trying to write the same information at the same it is no longer deterministic (not good)
The simple act of taking turns and waiting for every independent process to take its turn can completely negate the benefits of parallel computing
for (int i = 0; i < nCores; i++) {
if (bfArray[i] != null) {
try {
System.out.println("Joining " + bfArray[i]);
bfArray[i].join(); // wait until thread has finished
} catch (final InterruptedException e) {
System.out.println("ERROR - Thread : " + bfArray[i]
+ " was interrupted, proceed carefully!");
}
}
}
public synchronized void addBubble(final SeedLabel nBubble) {
bubList.append(nBubble);
completedBubbles++;
}
Inherits all of the problems of parallel programming with a whole variety of new issues.
If you have 1000 computers working on solving a problem and one fails, you do not want your whole job to crash
How can you access and process data from many different computers quickly without very expensive infrastructure
The machine being used by the user which is responsible for creating jobs which need to run
Distributes task across multiple machines, coordinates communication between machines
The nodes where the compuation is actually done
pre-main prep time: 1 ms
pre-main prep time: 1 ms
tell machine 1 to load images 0 to 2tell machine 2 to load images 3 to 5pre-main prep time: 3 ms
pre-main prep time: 0 ms
tell share the images to calculate the overlap
pre-main prep time: 1 ms
\[ \textrm{Transistors} \propto 2^{T/(\textrm{18 months})} \]
Based on data from https://gist.github.com/humberto-ortiz/de4b3a621602b78bf90d
There are now many more transistors inside a single computer but the processing speed hasn't increased. How can this be?
The figure shows the range of cloud costs (determined by peak usage) compared to a local workstation with utilization shown as the average number of hours the computer is used each week.
The figure shows the cost of a cloud based solution as a percentage of the cost of buying a single machine. The values below 1 show the percentage as a number. The panels distinguish the average time to replacement for the machines in months
Directly coordinating tasks on a computer.
They look fairly similar, so what is the difference? The second is needlessly complicated for one person, but what if you have a team, how can several people make an imparitive soup faster (chopping vegetables together?)
How can many people make a declarative soup faster? Give everyone a different task (not completely efficient since some tasks have to wait on others)
What took an entire PhD 3-4 years ago, can now be measured in a weekend, or even several seconds. Analysis tools have not kept up, are difficult to customize, and usually highly specific.
Data-structures that were fast and efficient for computers with 640kb of memory do not make sense anymore
CPU's are not getting that much faster but there are a lot more of them. Iterating through a huge array takes almost as long on 2014 hardware as 2006 hardware
The most important job for any piece of analysis is to be correct.
Almost all image processing tasks require a number of people to evaluate and implement them and are almost always moving targets
The last of the major priorities is speed which covers both scalability, raw performance, and development time.
Google ran into 'big data' and its associated problems years ago: Peta- and exabytes of websites to collect and make sense of. Google uses an algorithm called PageRank™ for evaluating the quality of websites. They could have probably used existing tools if page rank were some magic program that could read and determine the quality of a site
for every_site_on_internet
current_site.rank=secret_pagerank_function(current_site)
end
Just divide all the websites into a bunch of groups and have each computer run a group, easy!
While the actual internals of PageRank are not public, the general idea is that sites are ranked based on how many sites link to them
for current_site in every_site_on_internet
current_pagerank = new SecretPageRankObj(current_site);
for other_site in every_site_on_internet
if current_site is_linked_to other_site
current_pagerank.add_site(other_site);
end
end
current_site.rank=current_pagerank.rank();
end
How do you divide this task?
some people claim to have had the idea before, Google is certainly the first to do it at scale
Several engineers at Google recognized common elements in many of the tasks being performed. They then proceeded to divide all tasks into two classes Map and Reduce
Map is where a function is applied to every element in the list and the function depends only on exactly that element \[ \vec{L} = \begin{bmatrix} 1,2,3,4,5 \end{bmatrix} \] \[ f(x) = x^2 \] \[ map(f \rightarrow \vec{L}) = \begin{bmatrix} 1,4,9,16,25 \end{bmatrix} \]
Reduce is more complicated and involves aggregating a number of different elements and summarizing them. For example the \( \Sigma \) function can be written as a reduce function \[ \vec{L} = \begin{bmatrix} 1,2,3,4,5 \end{bmatrix} \] \[ g(a,b) = a+b \] Reduce then applies the function to the first two elements, and then to the result of the first two with the third and so on until all the elements are done \[ reduce(f \rightarrow \vec{L}) = g(g(g(g(1,2),3),4),5) \]
They designed a framework for handling distributing and running these types of jobs on clusters. So for each job a dataset (\( \vec{L} \)), Map-task (\( f \)), a grouping, and Reduce-task (\( g \)) are specified
All of the steps in between can be written once in a robust, safe manner and then used for every task which can be described using this MapReduce paradigm. These tasks \( \langle \vec{L}, f(x), g(a,b) \rangle \) is refered to as a job.
The initial job was very basic, for more complicated jobs, a new notion of Key-value (KV) pairs must be introduced. A KV pair is made up of a key and value. A key must be comparable / hashable (a number, string, immutable list of numbers, etc) and is used for grouping data. The value is the associated information to this key.
Using MapReduce on a folder full of text-documents.
f(x) = [(word,1) for word in x.split(" ")]L = ["cat dog car",
"dog car dog"]
\[ \downarrow \textbf{ Map } : f(x) \]
[("cat",1),("dog",1),("car",1),("dog",1),("car",1),("dog",1)]
\[ \downarrow \textrm{ Shuffle / Group} \]
"cat": (1)
"dog": (1,1,1)
"car": (1,1)
\[ \downarrow \textbf{ Reduce } : g(a,b) \]
[("cat",1),("dog",3),("car",2)]
Hadoop is the opensource version of MapReduce developed by Yahoo and released as an Apache project. It provides underlying infrastructure and filesystem that handles storing and distributing data so each machine stores some of the data locally and processing jobs run where the data is stored.
“The Ultimate Scala Collections”- Martin Odersky (EPFL / Creator of Scala)
Developed by the Algorithms, Machines, and People Lab at UC Berkeley in 2012
General tool for all Directed Acyclical Graph (DAG) workflows
Course-grained processing \( \rightarrow \) simple operations applied to entire sets
In-memory caching
Zaharia, M., et. al (2012). Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing
int[] newImage = new int[inputImage.length];
for(int i=0;i<inputImage.length; i++) {
if(inputImage[i]>0) {
newImage[i] = inputImage[i];
}
}
val newImage = inputImage.map(v => if(v>0) v else 0)
Looks the same as Scala, what is the difference?
val newImage = inputImage.map(v => if(v>0) v else 0)
It is being performed on a cluster where the computation is not only divided between all available nodes, but also locality-aware meaning it moves the computation where the data is.
A bit more verbose, but the idea remains the same.
RDD<Integer> newImage = inputImage.map<Integer,Integer>(
new Function1<Integer,Integer>() {
Integer execute(Integer x) {
if(x>0) return x;
return new Integer(0);
}
}
});
Any possible problems?
These frameworks are really cool and Apache Spark has a lot of functionality , but I don't want to map-reduce a collection.
I want to
We have developed a number of commands for IQAE handling standard image processing tasks
Fully exensible with
First we start our cluster:
spark-ec2.py -s 50 launch 4quant-image-cluster
Load all of the samples (56TB of data)
loadImage(
"s3:/../brain_*_*_*/rec.tif"
)
\[ \downarrow \] A Resilient Distributed Dataset (RDD) \[ \textrm{Images}: \textrm{RDD}[((x,y,z),Img[Double])] =\\ \left[(\vec{x},\textrm{Img}),\cdots\right] \]
filteredImages = Images.gaussianFilter(1,0.5)
volFraction = Images.threshold(0.5).
map{keyImg =>
(sum(keyImg.img),keyImg.size)
}.reduce(_+_)
We have all of the filtered images and we want to stitch them together in a smart way.
pairImages = Images.
cartesian(Images).
filter((im1,im2) => dist(im1.pos,im2.pos)<1)
The cross correlation can then be executed on each pair of images from the new RDD (pairImages) by using the map command
displacementField = pairImages.
map{
((posA,ImgA), (posB,ImgB)) =>
xcorr(ImgA,ImgB,
in=posB-posA)
}
From the updated information provided by the cross correlations and by applying appropriate smoothing criteria across windows.
smoothField = displacementField.
window(3,3,3).
map(gaussianSmoothFunction)
This also ensures the original data is left unaltered and all analysis is reversible.
The final stiching can then be done by
alignImages.
filter(x=>abs(x-tPos)<img.size).
map { (x,img) =>
new Image(tSize).
copy(img,x,tPos)
}.combineImages()
\[ \downarrow \textrm{Translate to SQL} \]
SELECT COUNT(*) FROM
(SELECT SHAPE_ANALYSIS(LABEL_NUCLEI(pathology_slide)) FROM patients
WHERE disease LIKE "myleoma")
WHERE anisotropy > 0.75
\[ \downarrow \textrm{Load Myleoma Data Subset} \]
\[ \downarrow \textrm{Perform analysis on a every image} \]
\[ \downarrow \textrm{Filter out the most anisotropic cells} \]
Find all strongly reflective large objects within 1km of Paul Scherrer Intitute, Villigen, CH
\[ \downarrow \textrm{Translate to SQL} \]
SELECT contour FROM (
SELECT COMPONENT_LABEL(THRESHOLD(tile,200)) FROM esriTiles
WHERE DIST(LAT,-47.53000992328762,LONG,8.215198516845703)<1
) WHERE area>200
We can then visualize these contours easily
or apply them back to the original map
ETH Spinoff - 4Quant: From images to insight
While our framework is commercial, we build on top and integrate into open-source tools so that the research is reproducible by anyone, anywhere, using any machine (it just might take a bit longer)
We also try to publish as much code, educational material, and samples as possible on our github account.

We are interested in partnerships and collaborations
Bottleneck is filesystem connection, many nodes (10+) reading in parallel brings even GPFS-based infiniband system to a crawl
One of the central tenants of MapReduce™ is data-centric computation \( \rightarrow \) instead of data to computation, move the computation to the data.